package com.migrate.module.util;

import cn.hutool.core.util.StrUtil;
import cn.hutool.json.JSONArray;
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import com.migrate.module.domain.BinlogData;

import java.lang.reflect.Field;
import java.math.BigDecimal;
import java.util.*;

/**
 * MySQL binlog解析工具类
 *
 * @author zhonghuashishan
 */
public abstract class BinlogUtils
{
    /**
     * 实体类所在包，这里实际上应该配置在配置文件里
     */
    private static final String DOMAIN_PATH = "com.migrate.module.domain";
    /**
     * binlog的data数组里数据的类型为实体类：domain
     */
    private static final String DATATYPE_DOMAIN = "domain";
    /**
     * binlog的data数组里数据的类型为map：map
     */
    private static final String DATATYPE_MAP = "map";
    /**
     * 从MySQL的binlog json字符串获取BinlogData对象(表数据以实体类Map的形式返回)
     * @param binlogStr binlog json字符串
     * @return BinlogData对象
     */
    public static BinlogData getBinlogDataMap (String binlogStr) throws ClassNotFoundException, InstantiationException, IllegalAccessException
    {
        return getBinlogData (binlogStr, DATATYPE_MAP);
    }
    /**
     * 从MySQL的binlog json字符串获取BinlogData对象(表数据以实体类List的形式返回)
     * @param binlogStr binlog json字符串
     * @return BinlogData对象
     */
    public static BinlogData getBinlogDataList (String binlogStr) throws ClassNotFoundException, InstantiationException, IllegalAccessException
    {
        return getBinlogData (binlogStr, DATATYPE_DOMAIN);
    }

    /**
     * 解析binlog json字符串
     * @param binlogStr binlog json字符串
     * @param dataType 解析后的data的类型（实体类还是map）
     * @return BinlogData
     * @throws ClassNotFoundException 找不到实体类异常
     * @throws InstantiationException 实例化实体类异常
     * @throws IllegalAccessException 非法访问异常
     */
    private static BinlogData getBinlogData (String binlogStr, String dataType) throws ClassNotFoundException, InstantiationException, IllegalAccessException
    {
        // isJson方法里面会判断字符串是不是为空，所以这里不需要重复判断
        if (JSONUtil.isJson(binlogStr))
        {
            JSONObject binlogJson = JSONUtil.parseObj(binlogStr);
            BinlogData binlogData = new BinlogData();
            // 表名
            String tableName = binlogJson.getStr("table");
            binlogData.setTableName(tableName);
            // 操作类型
            String operateType = binlogJson.getStr("type");
            binlogData.setOperateType(operateType);
            // 操作时间
            Long operateTime = binlogJson.getLong("ts");
            binlogData.setOperateTime(operateTime);
            // 获取数据json数组
            JSONArray dataArray = binlogJson.getJSONArray("data");
            if (null != dataArray) {

                Iterable <JSONObject> dataArrayIterator = dataArray.jsonIter();
                // 遍历data节点并反射生成对象
                if (null != dataArrayIterator){
                    // binlog的data数组里数据的类型为实体类
                    if (DATATYPE_DOMAIN.equals(dataType))
                    {
                        // 获取实体类名称
                        String domainName = DOMAIN_PATH + '.' + StrUtil.upperFirst(StrUtil.toCamelCase(tableName));
                        // 获取表对应的实体类（这里出现异常就抛出去了，实际使用时应该捕获并记录日志，因为根据表名找不到对象，那么这个表的所有数据都无法同步，这种情况肯定要记录日志并告警的）
                        Class<?> domainClass= Class.forName(domainName);
                        List<Object> datas = new ArrayList<>();
                        while (dataArrayIterator.iterator().hasNext()){
                            JSONObject jsonObject = dataArrayIterator.iterator().next();
                            Field[] fields = domainClass.getDeclaredFields();
                            // 通过反射创建实体类实例（这里的异常也直接外抛了，实际处理时需要记录这个异常并告警）
                            Object domain = domainClass.newInstance();
                            for (Field field : fields)
                            {
                                // 根据属性名称反向取得对应的表中的字段名称，然后根据属性的类型取得字段值并通过set方法设置进去
                                String fieldName = field.getName();
                                String columnName = StrUtil.toSymbolCase(fieldName, '_');
                                // 因为我们的属性是私有的，所以这里需要设置为可访问方便直接设值
                                field.setAccessible(true);
                                Object fieldValue = getFieldValue (field.getType(), columnName, jsonObject);
                                if (null != fieldValue)
                                {
                                    field.set(domain, fieldValue);
                                }
                            }
                            datas.add(domain);
                        }
                        binlogData.setDatas(datas);
                    }
                    else if (DATATYPE_MAP.equals(dataType))
                    {
                        // binlog的data数组里数据的类型为Map
                        List<Map<String, Object>> dataMap = new ArrayList<>();
                        while (dataArrayIterator.iterator().hasNext()){
                            JSONObject jsonObject = dataArrayIterator.iterator().next();
                            Map <String, Object> data = new HashMap<>();
                            jsonObject.keySet().forEach(key -> {
                                String camelKey = StrUtil.toCamelCase(StrUtil.lowerFirst(key));
                                data.put(camelKey, jsonObject.get(key));
                            });
                            dataMap.add(data);
                        }
                        binlogData.setDataMap(dataMap);
                    }
                }
            }
            return binlogData;
        }
        return null;
    }

    /**
     * 从JSONObject取得对应的属性类型的值(不需要考虑里面的自定义对象、集合对象，因为一条binlog里面的一个data就是一个普通对象)
     * @param fieldType 属性类型
     * @param jsonObject JSONObject
     * @return 属性值
     */
    private static Object getFieldValue(Class<?> fieldType, String columnName, JSONObject jsonObject)
    {
        Object fieldValue = null;
        if (String.class.equals(fieldType))
        {
            fieldValue = jsonObject.getStr(columnName);
        }
        else if (Character.class.equals(fieldType))
        {
            fieldValue = jsonObject.getChar(columnName);
        }
        else if (Integer.class.equals(fieldType))
        {
            fieldValue = jsonObject.getInt(columnName);
        }
        else if (Long.class.equals(fieldType))
        {
            fieldValue = jsonObject.getLong(columnName);
        }
        else if (Double.class.equals(fieldType))
        {
            fieldValue = jsonObject.getDouble(columnName);
        }
        else if (Float.class.equals(fieldType))
        {
            fieldValue = jsonObject.getFloat(columnName);
        }
        else if (Short.class.equals(fieldType))
        {
            fieldValue = jsonObject.getShort(columnName);
        }
        else if (Date.class.equals(fieldType))
        {
            fieldValue = jsonObject.getDate(columnName);
        }
        else if (BigDecimal.class.equals(fieldType))
        {
            fieldValue = jsonObject.getBigDecimal(columnName);
        }
        else if (Boolean.class.equals(fieldType))
        {
            fieldValue = jsonObject.getBool(columnName);
        }
        else if (Byte.class.equals(fieldType))
        {
            fieldValue = jsonObject.getByte(columnName);
        }
        return fieldValue;
    }
}
